package main

import (
	"encoding/json"
	"errors"
	"log"
	"net/http"
	"sync"
	"time"

	"github.com/gorilla/websocket"
)

const (
	// 允许等待的写入时间
	writeWait = 10 * time.Second
	// Time allowed to read the next pong message from the peer.
	pongWait = 0
	// Send pings to peer with this period. Must be less than pongWait.
	pingPeriod = (pongWait * 9) / 10
	// Maximum message size allowed from peer.
	maxMessageSize = 40960
)

var maxConnId int64
// 用于广播
var WsConnAll map[int64]*WsConnection

var upgrader = websocket.Upgrader{
	ReadBufferSize:  1024,
	WriteBufferSize: 1024,
	// 允许所有的CORS 跨域请求，正式环境可以关闭
	CheckOrigin: func(r *http.Request) bool {
		return true
	},
}

// 客户端读写消息
type wsMessage struct {
	// websocket.TextMessage 消息类型
	messageType int
	data        []byte
}

// 客户端连接
type WsConnection struct {
	wsSocket *websocket.Conn // 底层websocket
	inChan   chan *wsMessage // 读队列
	outChan  chan *wsMessage // 写队列

	mutex             sync.Mutex // 避免重复关闭管道,加锁处理
	isClosed          bool
	closeChan         chan byte // 关闭通知
	id                int64
	lastHeartBeatTime time.Time // 上次心跳包时间
	needHeartBeat     bool      // 上次心跳
}

func init() {
	maxConnId = 0
}

// 读取消息队列中的消息
func WsHandler(resp http.ResponseWriter, req *http.Request) {
	// 应答客户端告知升级连接为websocket
	wsSocket, err := upgrader.Upgrade(resp, req, nil)
	if err != nil {
		log.Println("升级为websocket失败", err.Error())
		return
	}
	wsSocket.SetReadLimit(40960);
	maxConnId++
	// TODO 如果要控制连接数可以计算，wsConnAll长度
	// 连接数保持一定数量，超过的部分不提供服务
	wsConn := &WsConnection{
		wsSocket:          wsSocket,
		inChan:            make(chan *wsMessage, 50),
		outChan:           make(chan *wsMessage, 50),
		closeChan:         make(chan byte),
		isClosed:          false,
		id:                maxConnId,
		lastHeartBeatTime: time.Now(),
		needHeartBeat:     false,
	}
	WsConnAll[maxConnId] = wsConn
	log.Println("当前在线人数", len(WsConnAll))
	// 处理器,发送定时信息，避免意外关闭
	go wsConn.processLoop()
	// 读协程
	go wsConn.wsReadLoop()
	// 写协程
	go wsConn.wsWriteLoop()
}

// 读取消息队列中的消息
func (wsConn *WsConnection) wsRead() (*wsMessage, error) {
	tick := time.Tick(time.Second * 1)
	select {
	case msg := <-wsConn.inChan:
		// 获取到消息队列中的消息
		return msg, nil
	case <-wsConn.closeChan:
	case <-tick:
		return nil, errors.New("超时")
	}
	return nil, errors.New("连接已经关闭")
}

// 处理队列中的消息
func (this *WsConnection) PayloadParseAndCallback(dat []byte) error {
	var payload Request
	e := json.Unmarshal(dat, &payload)
	if nil != e {
		log.Print(e.Error())
		return e
	}
	log.Print(payload)
	return nil
}

// 处理队列中的消息
func (wsConn *WsConnection) processLoop() {
	// 处理消息队列中的消息
	// 获取到消息队列中的消息，处理完成后，发送消息给客户端
	for {
		msg, err := wsConn.wsRead()
		if err != nil {
			if err.Error() == "超时" {
				// log.Print(wsConn.lastHeartBeatTime.String())
				if(wsConn.needHeartBeat){
					if time.Now().Sub(wsConn.lastHeartBeatTime) > time.Second*15 {
						log.Print("心跳超时")
						wsConn.close()
					}
				}
				continue
			}
			break
		}
		ProtoCallBack(wsConn,msg.data)
	}
}
type Response struct{
	Type int `json:"type"`
	Payload interface{} `json:"data"`
}
// 发送信息
func (wsConn *WsConnection) SendPayload(ptype int64, v interface{}) error {
	var resp Response
	resp.Type = int(ptype)
	resp.Payload = v
	bytes, e := json.Marshal(resp)
	if nil != e {
		log.Print(e.Error())
	}
	wsConn.wsWrite(1, bytes)
	return nil
}

// 写入消息到队列中
func (wsConn *WsConnection) wsWrite(messageType int, data []byte) error {
	select {
		case wsConn.outChan <- &wsMessage{messageType, data}:
		case <-wsConn.closeChan:
			return errors.New("连接已经关闭")
	}
	return nil
}

// 处理消息队列中的消息
func (wsConn *WsConnection) wsReadLoop() {
	// 设置消息的最大长度
	wsConn.wsSocket.SetReadLimit(maxMessageSize)
	for {
		// 读一个message
		msgType, data, err := wsConn.wsSocket.ReadMessage()
		req := &wsMessage{
			msgType,
			data,
		}
		if err != nil {
			log.Print(err.Error())
			if(websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure)){
				log.Println("消息读取出现错误", err.Error())
			}
			wsConn.close()
			return
		}
		log.Print(string(data))
		// 放入请求队列,消息入栈
		select {
		case wsConn.inChan <- req:
		case <-wsConn.closeChan:
			return
		}
	}
}

// 发送消息给客户端
func (wsConn *WsConnection) wsWriteLoop() {
	for {
		select {
		// 取一个应答
		case msg := <-wsConn.outChan:
			// 写给websocket
			if err := wsConn.wsSocket.WriteMessage(msg.messageType, msg.data); err != nil {
				log.Println("发送消息给客户端发生错误", err.Error())
				// 切断服务
				wsConn.close()
				return
			}
		case <-wsConn.closeChan:
			// 获取到关闭通知
			return
		}
	}
}
// 关闭连接
func (wsConn *WsConnection) close() {
	log.Println("关闭连接被调用了")
	wsConn.wsSocket.Close()
	wsConn.mutex.Lock()
	
	defer wsConn.mutex.Unlock()
	if wsConn.isClosed == false {
		wsConn.isClosed = true
		// 删除这个连接的变量
		delete(WsConnAll, wsConn.id)
		close(wsConn.closeChan)
	}
	ProtoUnconnect(wsConn)
}
